Skip to content

Conversation

@ela-kotulska-frequenz
Copy link
Contributor

Documentation shows how to create Pipe inside context manager to cleanup the resources when context is exited.

@ela-kotulska-frequenz ela-kotulska-frequenz added the part:docs Affects the documentation label Apr 8, 2025
@ela-kotulska-frequenz ela-kotulska-frequenz self-assigned this Apr 8, 2025
Copilot AI review requested due to automatic review settings April 8, 2025 15:21
@ela-kotulska-frequenz ela-kotulska-frequenz requested a review from a team as a code owner April 8, 2025 15:21
@github-actions github-actions bot added the part:experimental Affects the experimental package label Apr 8, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (1)

src/frequenz/channels/experimental/_pipe.py:33

  • The code snippet references 'Receiver[int]' without importing 'Receiver'. Consider adding the appropriate import (e.g., from frequenz.channels import Receiver) to avoid confusion.
async def create_forwarding_pipe(receiver: Receiver[int]) -> AsyncIterator[Broadcast[int]]:

@ela-kotulska-frequenz ela-kotulska-frequenz force-pushed the pipe_doc branch 2 times, most recently from 66559d7 to aca4789 Compare April 8, 2025 15:22
@ela-kotulska-frequenz ela-kotulska-frequenz requested review from cyiallou and shsms and removed request for cyiallou April 8, 2025 16:30
llucax
llucax previously approved these changes Apr 9, 2025
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All comments are minor, I think the example is good enough to go as it is, just ideas of possible improvements.

from frequenz.channels import Broadcast, Pipe, Receiver
@asynccontextmanager
async def create_forwarding_pipe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I would try to use the pattern used by the Python library, like closing. Maybe call this forwarding_channel (as you are also returning the channel, not the pipe)?

Then:

        async with forwarding_channel(
            source_channel.new_receiver()
        ) as destination_channel:
            receiver = destination_channel.new_receiver()
            await source_sender.send(10)
            assert await receiver.receive() == 10

async with create_forwarding_pipe(
source_channel.new_receiver()
) as forwarding_channel:
receiver = forwarding_channel.new_receiver()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here you don't close the receiver because you are closing the channel? Maybe this needs a note because it might not be obvious? I also wonder if, at least for this example, it wouldn't make more sense to return a new receiver instead of the whole channel, and explicitly close the receiver in the async context manager, to make it more clear/explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't close it because it goes with with not async with and I simply forgot...

RELEASE_NOTES.md Outdated
## Upgrading

- Some minimal dependencies have been bumped, so you might need to adjust your dependencies accordingly.
- Improve documentation of the frequenz.channels.experimental.Pipe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really an upgrade step, so I would put it into New Features (even if it is not a new feature either :D, sometimes I create a new section # Enhancements in these cases, but I think it can also be left out of the release notes, as I don't think anyone will go check out the Pipe docs just because it is in the changelog).

@llucax
Copy link
Contributor

llucax commented Apr 9, 2025

If we merge this, we can even simplify the example further and only use async with aclosing(...) 😉

@ela-kotulska-frequenz
Copy link
Contributor Author

ela-kotulska-frequenz commented Apr 16, 2025

Ok prepared 2 examples:

  1. with asynccontextmanager and all objects properly closed:
    Example:
        from contextlib import asynccontextmanager, closing, aclosing
        from typing import AsyncIterator
        from frequenz.channels import Broadcast, Pipe, Receiver

        @asynccontextmanager
        async def new_forwarding_channel(
            receiver: Receiver[int],
        ) -> AsyncIterator[Broadcast[int]]:
            forwarding_channel: Broadcast[int] = Broadcast(name="forwarded channel")
            forwarding_sender = forwarding_channel.new_sender()
            try:
                async with Pipe(receiver, forwarding_sender):
                    yield forwarding_channel
            finally:
                receiver.close()
                await forwarding_channel.aclose()

        async with (
            aclosing(Broadcast(name="source channel")) as source_channel,
            new_forwarding_channel(source_channel.new_receiver()) as forwarding_channel,
            ):
            with closing(forwarding_channel.new_receiver()) as receiver:
                source_sender = source_channel.new_sender()
                await source_sender.send(10)
                assert await receiver.receive() == 10
  1. Without asynccontextmanager and all object properly closed
        from contextlib import closing, aclosing
        from frequenz.channels import Broadcast, Pipe, Receiver

        async with aclosing(Broadcast(name="source channel")) as source_channel:
            with closing(source_channel.new_receiver()) as source_receiver:
                async with (
                    aclosing(Broadcast(name="forwarded channel")) as forwarding_channel,
                    Pipe(source_receiver, forwarding_channel.new_sender())
                    ):
                    with closing(forwarding_channel.new_receiver()) as receiver:
                        source_sender = source_channel.new_sender()
                        await source_sender.send(10)
                        assert await receiver.receive() == 10

The first one has more lines, but took me much less time to write and understand

@ela-kotulska-frequenz
Copy link
Contributor Author

It is a new version with AsyncExitStack :)

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

src/frequenz/channels/experimental/_pipe.py:38

  • Consider using 'aclosing' instead of 'closing' when entering the async context for 'new_receiver()' (if it is an asynchronous context manager) to ensure proper async cleanup.
source_receiver = await stack.enter_async_context(closing(source_channel.new_receiver()))

Comment on lines 37 to 38
source_receiver = await stack.enter_async_context(
closing(source_channel.new_receiver()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really works? AsycnExitStack() also provides the methods provided by ExitStack, so you should probably use enter_context() here that the context manager is sync.

Suggested change
source_receiver = await stack.enter_async_context(
closing(source_channel.new_receiver()))
source_receiver = await stack.enter_context(
closing(source_channel.new_receiver()))

Copy link
Contributor Author

@ela-kotulska-frequenz ela-kotulska-frequenz Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea sorry I assumed the code in Example is executed. (because they are showed in the pytest summary as tests.
I just found out only imports are checked. Fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pylint is also ran, but the code itself isn't, nor mypy or other lint tools.

aclosing(Broadcast(name="forwarding channel")))
await stack.enter_async_context(Pipe(source_receiver, forwarding_channel.new_sender()))
receiver = await stack.enter_async_context(closing(forwarding_channel.new_receiver()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

Suggested change
receiver = await stack.enter_async_context(closing(forwarding_channel.new_receiver()))
receiver = await stack.enter_context(closing(forwarding_channel.new_receiver()))

@github-actions github-actions bot added the part:tests Affects the unit, integration and performance (benchmarks) tests label Apr 16, 2025
llucax
llucax previously approved these changes Apr 16, 2025
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking care of this!

Documentation shows how to create Pipe inside context manager
to cleanup the resources when context is exited.

Signed-off-by: Elzbieta Kotulska <[email protected]>
@ela-kotulska-frequenz ela-kotulska-frequenz added this pull request to the merge queue Apr 17, 2025
Merged via the queue into frequenz-floss:v1.x.x with commit 9e67b90 Apr 17, 2025
5 checks passed
@ela-kotulska-frequenz ela-kotulska-frequenz deleted the pipe_doc branch April 17, 2025 06:52
@llucax llucax added this to the v1.8.0 milestone Jun 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:docs Affects the documentation part:experimental Affects the experimental package part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants